package net.gdface.worker;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import net.gdface.utils.Assert;
import net.gdface.utils.Judge;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 队列管理器接口实现<br>
 * 使用{@link DelayQueue}结合{@link java.util.concurrent.ArrayBlockingQueue}的特点,实现一个能控制任务开始时间的有界半阻塞队列管理器<br>
 * {@link #push(WorkData)}为阻塞添加方法<br>
 * {@link #push(WorkData, long)}为非阻塞添加方法<br>
 * 可以实现将队列大小控制在{@link #capacity}附近的范围，防止队列无限制扩张
 * 
 * @author guyadong
 *
 */
public class QueueManagerImpl<T extends WorkData> implements QueueManager<T> {

	protected static final Logger logger = LoggerFactory.getLogger(QueueManagerImpl.class);
	public static final long DEFAULT_INTERVAL_MILLS = 10 * 1000;
	/**
	 * 默认线程名字
	 */
	private static final String DEFAULT_NAME = "QM";
	/**
	 * 工作队列
	 */
	private final DelayQueue<T> workQueue = new DelayQueue<T>();
	/**
	 * 缺省的任务间隔时间
	 */
	private long defaultIntervalMills;
	private final String name;
	/**
	 * 队列容量，当容量大于此值时，执行{@link #push(WorkData)}方法的线程会阻塞<br>
	 * 但使用无阻塞方法 {@link #push(WorkData, long)}时，不受此值限制
	 */
	private final int capacity;
	private final ReentrantLock queueLock = new ReentrantLock();
	private final Condition notFull = queueLock.newCondition();
	private final Condition notEmpty= queueLock.newCondition();
	public QueueManagerImpl(long defaultIntervalMills, String name, int capacity) {
		this.defaultIntervalMills = defaultIntervalMills < 0 ? DEFAULT_INTERVAL_MILLS : defaultIntervalMills;
		this.name = Judge.isEmpty(name) ? DEFAULT_NAME : name;
		this.capacity = capacity;
		logger.info("workQueueSize [{}](工作队列大小)=[{}]",this.name,this.capacity);
	}

	@Override
	public final boolean isFinished() {
		return queueSize() == 0;
	}

	@Override
	public T pop(long timeout, TimeUnit unit) throws InterruptedException {
		long nanos = unit.toNanos(timeout);
		final ReentrantLock lock = this.queueLock;
		lock.lockInterruptibly();
		try {			
			while(true){
				T first = workQueue.peek();
                if (first == null||first.getDelay(TimeUnit.NANOSECONDS) > 0) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = notEmpty.awaitNanos(nanos);
                } else {
					T task = workQueue.poll();
					assert task == first;
					if (queueSize() < capacity)
						this.notFull.signal();
					return task;
                }
			}

		} finally {
			lock.unlock();
		}
	}

	@Override
	public void push(T task) throws InterruptedException {
		Assert.notNull(task, "task");
		final ReentrantLock lock = this.queueLock;
		lock.lockInterruptibly();
		try {
			while (queueSize() >= capacity)
				notFull.await();
			this.workQueue.add(task);
			this.notEmpty.signal();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public boolean push(T task, long delayMills) {
		Assert.notNull(task, "task");
		/*if(task instanceof WorkDataWithURI)
			logger.info("{} DELAY:{}", name, ((WorkDataWithURI) task).getURI());*/
		final ReentrantLock lock = this.queueLock;
		lock.lock();
		try {
			if (delayMills != 0)
				task.setDelay(delayMills < 0 ? this.defaultIntervalMills : delayMills);
			boolean ok = workQueue.add(task);
			this.notEmpty.signal();
			return ok;
		} finally {
			lock.unlock();
		}
	}

	@Override
	public int queueSize() {
		final ReentrantLock lock = this.queueLock;
		lock.lock();
		try {
			return workQueue.size();
		} finally {
			lock.unlock();
		}
	}

	/**
	 * @param defaultIntervalMills
	 *            要设置的 defaultIntervalMills
	 */
	@Override
	public void setDefaultIntervalMills(long defaultIntervalMills) {
		this.defaultIntervalMills = defaultIntervalMills;
	}
}